Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of integration receiver and processor #96

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

jsoriano
Copy link
Member

@jsoriano jsoriano commented Aug 20, 2024

Summary

This POC revisits the idea exposed in the open-telemetry/opentelemetry-collector-contrib#26312 to use a receiver to render templates as a mechanism to distribute reusable configurations as part of integrations. Templates are in this POC more similar to OTel collector configuration files, with placeholders for variables.

This POC includes the following components:

  • internal/integrations with helper code to work with integrations in other components.
  • extension/fileintegrationextension with an extension that can be used by other components to obtain integrations from local disk.
  • extension/configintegrationextension with an extension that can be used to embed integration templates in the main OTel collector configuration.
  • processor/integrationprocessor processor that is composed by instantiating processors from a integration.
  • receiver/integrationreceiver receiver that is composed by instantiating receivers and processors from a integration.

Open questions:

  • Should we explicitly declare dependencies of each integration?
  • Should we version integrations? Would it depend on the source?
  • Do we need to support extensions in integrations or it is ok if the user needs to define them and pass the ids as parameter?

Purpose and use-cases of the new components

The main purpose of integrations is to provide an abstraction to share configurations intended to observe specific applications and services.

Templates are interpreted by components created with this purpose. Two main components are implemented for that, a receiver and a processor. They can use extensions to discover integrations.

These components are intended to be used with minimal configuration, so users can leverage integrations without extensive knowledge of the OTEL collector. The minimal configuration users need to provide is one or more sources of integrations, configured as extensions, the name of the integrations and the parameters it needs.

The use of receiver as the main component to use integrations will allow to leverage any other feature built for receivers, such as autodiscover features.

The main use case is the distribution of configurations for specific services, these configurations include receivers and processors, and pipelines to indicate how the receivers and processors have to be combined. A single integration can include multiple pipelines, users can chose which ones to use.

Find below an example integration, mostly copied from the one in open-telemetry/opentelemetry-collector#8372.

receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: 'couchbase'
          scrape_interval: 5s
          static_configs:
            - targets: ${var:endpoints}
          basic_auth:
            username: ${var:username}
            password: ${var:password}
          metric_relabel_configs:
            # Include only a few key metrics
            - source_labels: [ __name__ ]
              regex: "(kv_ops)|\
                (kv_vb_curr_items)|\
                (kv_num_vbuckets)|\
                (kv_ep_cursor_memory_freed_bytes)|\
                (kv_total_memory_used_bytes)|\
                (kv_ep_num_value_ejects)|\
                (kv_ep_mem_high_wat)|\
                (kv_ep_mem_low_wat)|\
                (kv_ep_tmp_oom_errors)|\
                (kv_ep_oom_errors)"
              action: keep

processors:
  filter:
    # Filter out prometheus scraping meta-metrics.
    metrics:
      exclude:
        match_type: strict
        metric_names:
          - scrape_samples_post_metric_relabeling
          - scrape_series_added
          - scrape_duration_seconds
          - scrape_samples_scraped
          - up

  metricstransform:
    transforms:
      # Rename from prometheus metric name to OTel metric name.
      # We cannot do this with metric_relabel_configs, as the prometheus receiver does not
      # allow metric renames at this time.
      - include: kv_ops
        match_type: strict
        action: update
        new_name: "couchbase.bucket.operation.count"
      - include: kv_vb_curr_items
        match_type: strict
        action: update
        new_name: "couchbase.bucket.item.count"
      - include: kv_num_vbuckets
        match_type: strict
        action: update
        new_name: "couchbase.bucket.vbucket.count"
      - include: kv_ep_cursor_memory_freed_bytes
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.usage.free"
      - include: kv_total_memory_used_bytes
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.usage.used"
      - include: kv_ep_num_value_ejects
        match_type: strict
        action: update
        new_name: "couchbase.bucket.item.ejection.count"
      - include: kv_ep_mem_high_wat
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.high_water_mark.limit"
      - include: kv_ep_mem_low_wat
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.low_water_mark.limit"
      - include: kv_ep_tmp_oom_errors
        match_type: strict
        action: update
        new_name: "couchbase.bucket.error.oom.count.recoverable"
      - include: kv_ep_oom_errors
        match_type: strict
        action: update
        new_name: "couchbase.bucket.error.oom.count.unrecoverable"
      # Combine couchbase.bucket.error.oom.count.x and couchbase.bucket.memory.usage.x
      # metrics.
      #- include: '^couchbase\.bucket\.error\.oom\.count\.(?P<error_type>unrecoverable|recoverable)$$'
      #  match_type: regexp
      #  action: combine
      #  new_name: "couchbase.bucket.error.oom.count"
      - include: '^couchbase\.bucket\.memory\.usage\.(?P<state>free|used)$$'
        match_type: regexp
        action: combine
        new_name: "couchbase.bucket.memory.usage"
      # Aggregate "result" label on operation count to keep label sets consistent across the metric datapoints
      - include: 'couchbase.bucket.operation.count'
        match_type: strict
        action: update
        operations:
          - action: aggregate_labels
            label_set: ["bucket", "op"]
            aggregation_type: sum

  transform:
    metric_statements:
    - context: datapoint
      statements:
        - convert_gauge_to_sum("cumulative", true) where metric.name == "couchbase.bucket.operation.count"
        - set(metric.description, "Number of operations on the bucket.") where metric.name == "couchbase.bucket.operation.count"
        - set(metric.unit, "{operations}") where metric.name == "couchbase.bucket.operation.count"

        - convert_gauge_to_sum("cumulative", false) where metric.name == "couchbase.bucket.item.count"
        - set(metric.description, "Number of items that belong to the bucket.") where metric.name == "couchbase.bucket.item.count"
        - set(metric.unit, "{items}") where metric.name == "couchbase.bucket.item.count"

        - convert_gauge_to_sum("cumulative", false) where metric.name == "couchbase.bucket.vbucket.count"
        - set(metric.description, "Number of non-resident vBuckets.") where metric.name == "couchbase.bucket.vbucket.count"
        - set(metric.unit, "{vbuckets}") where metric.name == "couchbase.bucket.vbucket.count"

        - convert_gauge_to_sum("cumulative", false) where metric.name == "couchbase.bucket.memory.usage"
        - set(metric.description, "Usage of total memory available to the bucket.") where metric.name == "couchbase.bucket.memory.usage"
        - set(metric.unit, "By") where metric.name == "couchbase.bucket.memory.usage"

        - convert_gauge_to_sum("cumulative", true) where metric.name == "couchbase.bucket.item.ejection.count"
        - set(metric.description, "Number of item value ejections from memory to disk.") where metric.name == "couchbase.bucket.item.ejection.count"
        - set(metric.unit, "{ejections}") where metric.name == "couchbase.bucket.item.ejection.count"

        - convert_gauge_to_sum("cumulative", true) where metric.name == "couchbase.bucket.error.oom.count"
        - set(metric.description, "Number of out of memory errors.") where metric.name == "couchbase.bucket.error.oom.count"
        - set(metric.unit, "{errors}") where metric.name == "couchbase.bucket.error.oom.count"

        - set(metric.description, "The memory usage at which items will be ejected.") where metric.name == "couchbase.bucket.memory.high_water_mark.limit"
        - set(metric.unit, "By") where metric.name == "couchbase.bucket.memory.high_water_mark.limit"

        - set(metric.description, "The memory usage at which ejections will stop that were previously triggered by a high water mark breach.") where metric.name == "couchbase.bucket.memory.low_water_mark.limit"
        - set(metric.unit, "By") where metric.name == "couchbase.bucket.memory.low_water_mark.limit"

pipelines:
  metrics:
    receiver: prometheus
    processors:
      - filter
      - metricstransform
      - transform

Example configuration for the component

Using the integration receiver:

extensions:
  file_integrations: # <--- This extension is used to discover integrations from a local directory.
    path: "./integrations"

receivers:
  integration/couchbase:
    name: "couchbase" # <--- This is the name of the integration.
    pipelines: [metrics,logs]
    parameters:
      endpoints: [localhost:8091]
      username: Administrator
      password: foobar

exporters:
  debug:
    verbosity: detailed

service:
  extensions: [file_integrations]
  pipelines:
    metrics:
      receivers: [integration/couchbase]
      exporters: [debug]

Using the integration processor:

extensions:
  file_integrations:
    path: "./integrations"

receivers:
  prometheus/couchbase:
    config:
      scrape_configs:
        - job_name: 'couchbase'
          scrape_interval: 5s
          static_configs:
            - targets: [localhost:8091]

processors:
  integration/couchbase:
    name: "couchbase"

exporters:
  debug:
    verbosity: detailed

service:
  extensions: [file_integrations]
  pipelines:
    metrics:
      receivers: [prometheus/couchbase]
      processors: [integration/couchbase]
      exporters: [debug]

Telemetry data types supported

These are generic components that support any data type provided by other components.

Is this a vendor-specific component?

It is not vendor-specific.

Code owners

Elastic?

Sponsor

Elastic?

Additional context

This proposal is a follow-up of open-telemetry/opentelemetry-collector-contrib#26312 and open-telemetry/opentelemetry-collector#8372.

@rogercoll
Copy link
Contributor

This looks really promising! I think having the templates as a receiver solves two main issues compared to the "Converter" approach:

  • Receiver creator: A template receiver could be included in a receivercreator configuration.
  • Forward connector: The template receiver does not rely on the collector having the "forward" connector in its pipeline.

Copy link
Contributor

@rogercoll rogercoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! I wanted to check if a receiverscreator would also work, template config:

extensions:
    host_observer:

receivers:
  receiver_creator/2:
    # Name of the extensions to watch for endpoints to start and stop.
    watch_observers: [host_observer]
    receivers:
      httpcheck/on_host:
        # If this rule matches an instance of this receiver will be started.
        rule: type == "port" && port == 8080
        resource_attributes:
          service.name: redis_on_host

processors:
  attributes/example:
    actions:
      - key: account_id
        value: 2245
        action: insert

pipelines:
  extensions: host_observer
  metrics:
    receiver: ""
    processors:
      - attributes/example

For the previous configuration to work we would need to add support for extensions in the configuration. I think that supporting extensions could be added in the future, we can focus on templates with just plain receivers + processors for the moment.

extension/filetemplateextension/extension.go Outdated Show resolved Hide resolved
extension/filetemplateextension/factory.go Outdated Show resolved Hide resolved
pkg/templates/templates.go Outdated Show resolved Hide resolved
}

type Config struct {
Receivers map[component.ID]map[string]any `mapstructure:"receivers"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some receivers like the receivecreator relies on extensions, see: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/receivercreator#examples

Could we include the extension component in the config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added open question about this to the description, it might make sense for authentication in some cases. For others it might be better to let the option to users to define any extension they want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not sure I get it, but the receivercreator would not be loaded as an integration receiver right? Hence its dependency to the observer extension does not really matter here?

However that's a valid point because we will aim to create technology specific integrations to cover logs use-cases then we would most probably need to pair filelogreceiver with the storage extension. See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/examples/fault-tolerant-logs-collection/README.md and https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/examples/fault-tolerant-logs-collection/otel-col-config.yaml#L4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, what I mean is that if an integration requires an extension, the integration could expose a variable, and the extension would need to be defined by the user in the config.
This may look as requiring additional configuration, but also allows to have simpler and more composable integrations.

Look for example to the POC for a nginx integration @mrodm prepared.

Templates include placeholders for the storage to use:
https://github.com/elastic/integrations/pull/11253/files#diff-26a3b4eab2c9b845b76591d786a09cd833ac1527288dd2e0c4e2c457fc853521R93

  filelog/access:
    include_file_path: true
    include: ${var:access_paths}
    start_at: beginning
    storage: ${var:storage_resource}

The final users needs to define an extension, and set the parameter for the integration: https://github.com/elastic/integrations/pull/11253/files#diff-7187c0dbde58e198aa05659ae047693f9fa1a12fb16cebece7e7ea06602901b1

extensions:
  ...
  file_storage:
    directory: ./data

receivers:
  integration/nginx_otel:
    name: "nginx_otel"
    parameters:
      ...
      storage_resource: file_storage

I think this is a nice approach, because this way the user can reuse a single extension definition for many receivers, being them integrations or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank's for elaborating, that makes sense!

Pipelines map[component.ID]PipelineConfig `mapstructure:"pipelines"`
}

func (c *Config) Validate() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function run during the collector's lifetime? I reckon the component's Validate interface implementation is run by the collector at startup. As this is not a component, I think this Validate function won't be run, should we move this check to a custom UnMarshal function instead?

pkg/templates/templates.go Outdated Show resolved Hide resolved
Processors []component.ID `mapstructure:"processors"`
}

func (p *PipelineConfig) Validate() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function won't be run

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look to this.

@jsoriano jsoriano changed the title POC of template receiver and processor POC of integration receiver and processor Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants